use clickhouse_rs::types::{Value, Block, Options};
use crate::data_frame::value::{RealValue};
use crate::sink::{SinkResult, Sink, SinkData};
use clickhouse_rs::{Pool};
use crate::conf_init::{MapResult, MapErr};
use crate::run_plan::{HandResult};
use crate::indicators::{indicators_key_ref, Op, IndicatorsMode};
use tokio_clickhouse::prelude::future::Future;
use tokio_clickhouse::runtime::Runtime as CKRuntime;
use crate::run_plan::func::RunPlanExprFunc;
use crate::logical_plan::expr::func::{logical_plan_expr_func_map_to_run_plan};
use tokio::runtime::Runtime;
use std::str::FromStr;
use std::collections::VecDeque;
use vecshard::{ShardExt, VecShard};
use std::time::Duration;

#[derive(Deserialize,Debug)]
pub struct ClickhouseConf{
    pub server: String, /*tcp://127.0.0.1:9000/tz?compression=lz4&ping_timeout=42ms&pool_max=500*/
    pub database: String,
    pub table: String,
    pub threads: usize,
    pub pair_list: Option<Vec<(String,String,bool)>>,
}

#[derive(Debug,Clone)]
pub struct ElementPair(Box<RunPlanExprFunc>,String,bool);

#[derive(Debug)]
pub struct ClickHouse {
    name: String,
    server: String,
    pool: Pool,
    runtime: CKRuntime,
    threads: usize,
    use_db: String,
    table: String,
    pair_list: Vec<ElementPair>,
    pair_list_cache: Vec<Vec<ElementPair>>,
}

impl ClickHouse {
    fn pair_list_cache(&mut self)->Vec<ElementPair>{
        if let Some(t) = self.pair_list_cache.pop(){
            return t;
        }else{
            return self.pair_list.clone();
        }
    }
    fn pair_list_cache_recycle(&mut self, t: Vec<ElementPair>){
        self.pair_list_cache.push(t);
    }
    pub fn new(name: &str, conf: &ClickhouseConf)->MapResult<Box<dyn Sink>>{
        let mut pair_list = Vec::new();

        if let Some(pl) = &conf.pair_list{
            for pl in pl.iter(){
                let source = logical_plan_expr_func_map_to_run_plan(pl.0.as_str())?;

                pair_list.push(ElementPair(source,pl.1.clone(),pl.2));
            }
        }

        let options = Options::from_str(conf.server.as_str())
            .unwrap()
            .pool_min(50)
            .pool_max(500)
            .database(conf.database.as_str())
            .ping_before_query(false)
            .keepalive(Some(Duration::from_secs(1)));

        let pool = Pool::new(options);

        let runtime = match tokio_clickhouse::runtime::Builder::new()
            .name_prefix("ck")
            .core_threads(conf.threads)
            .blocking_threads(conf.threads)
            .build(){
            Ok(t) => {t},
            Err(e) => {
                return MapResult::Err(MapErr::new(format!("ck runtime err {:?}", e)));
            },
        };

        return MapResult::Ok(Box::new(ClickHouse {
            name: name.to_string(),
            server: conf.server.clone(),
            pool: pool,
            runtime: runtime,
            threads: conf.threads,
            use_db: format!("use {}", conf.database),
            table: conf.table.clone(),
            pair_list: pair_list,
            pair_list_cache: Vec::new(),
        }));
    }
    pub async fn trans(name: String, mut pair_list: Vec<ElementPair>, mut data_list: VecShard<SinkData>)->(Block, Vec<ElementPair>){
        let name = name.as_str();
        let mut block = Block::new();

        for data in data_list.iter_mut() {
            let mut row = Vec::new();
            let mut row_jump = false;

            for pair in pair_list.iter_mut() {
                if let HandResult::Ok(value) = pair.0.evaluate_single(data).await {
                    if let Some(ck_value) = real_value_map_to_clickhouse_value(&value, pair.2) {
                        trace!("row {} {:?}", pair.1, ck_value);
                        row.push((pair.1.clone(), ck_value));
                    } else {
                        indicators_key_ref(Op::Add, IndicatorsMode::Sink, name, "trans err", 1);
                        if !pair.2 {
                            indicators_key_ref(Op::Add, IndicatorsMode::Sink, name, "trans err NonNullable", 1);
                            row_jump = true;
                            break;
                        } else {
                            indicators_key_ref(Op::Add, IndicatorsMode::Sink, name, "trans err Nullable", 1);
                        }
                    }
                } else {
                    indicators_key_ref(Op::Add, IndicatorsMode::Sink, name, "get err", 1);
                    if !pair.2 {
                        indicators_key_ref(Op::Add, IndicatorsMode::Sink, name, "get err NonNullable", 1);
                        row_jump = true;
                        break;
                    } else {
                        indicators_key_ref(Op::Add, IndicatorsMode::Sink, name, "get err Nullable", 1);
                    }
                }
            }

            if !row_jump {
                if let Err(_) = block.push(row) {
                    indicators_key_ref(Op::Add, IndicatorsMode::Sink, name, "drop row", 1);
                }
            } else {
                indicators_key_ref(Op::Add, IndicatorsMode::Sink, name, "drop row trans", 1);
            }
        }

        (block, pair_list)
    }
}

#[async_trait]
impl Sink for ClickHouse {
    async fn init(&mut self)->bool{
        true
    }
    async fn store(&mut self, runtime: &Runtime, data_list: Vec<Vec<SinkData>>, burst: usize, _concurrent: usize)->SinkResult{

        let mut join_set = VecDeque::new();

        for data in data_list.into_iter() {
            if data.len() >= burst {
                let (first, mut split) = data.split_inplace_at(burst);
                let name = self.name.clone();
                let pair_list = self.pair_list_cache();
                let t = runtime.spawn(
                    ClickHouse::trans(name, pair_list, first)
                );
                join_set.push_back(t);

                loop {
                    if split.len() >= burst {
                        let (first, second) = split.split_inplace_at(burst);
                        split = second;
                        let name = self.name.clone();
                        let pair_list = self.pair_list_cache();
                        let t = runtime.spawn(
                            ClickHouse::trans(name, pair_list, first)
                        );
                        join_set.push_back(t);
                    } else {
                        let name = self.name.clone();
                        let pair_list = self.pair_list_cache();
                        let t = runtime.spawn(
                            ClickHouse::trans(name, pair_list, split)
                        );
                        join_set.push_back(t);
                        break;
                    }
                }
            } else {
                let data_len = data.len();
                let (first, _split) = data.split_inplace_at(data_len);
                let name = self.name.clone();
                let pair_list = self.pair_list_cache();
                let t = runtime.spawn(
                    ClickHouse::trans(name, pair_list, first)
                );
                join_set.push_back(t);
            }
        }

        while let Some(t) = join_set.pop_front() {
            if let Ok((block, pair_list)) = t.await {
                self.pair_list_cache_recycle(pair_list);

                if !block.is_empty() {
                    let rows = block.row_count();
                    let use_db = self.use_db.clone();
                    let table = self.table.clone();

                    let done = self.pool
                        .get_handle()
                        .and_then(move |c| c.insert(table, block))
                        .and_then(move |_| {
                            Ok(())
                        })
                        .map_err(|e| {
                            format!("{:?}", e)
                        });

                    match self.runtime.block_on(done) {
                        Ok(_) => {
                            indicators_key_ref(Op::Add, IndicatorsMode::Sink, self.name.as_str(), "insert block", 1);
                            indicators_key_ref(Op::Add, IndicatorsMode::Sink, self.name.as_str(), "insert row", rows);
                        },
                        Err(e) => {
                            warn!("clickhouse insert failed [{:?}]", e);
                            indicators_key_ref(Op::Add, IndicatorsMode::Sink, self.name.as_str(), "drop block", 1);
                            indicators_key_ref(Op::Add, IndicatorsMode::Sink, self.name.as_str(), "drop row", rows);
                        },
                    }
                }
            }
        }

        return SinkResult::Ok(());
    }
}

pub fn real_value_map_to_clickhouse_value(value: &RealValue, is_nullable: bool)->Option<Value>{
    match value{
        RealValue::U8(v) => {
            Some(
                if is_nullable{
                    Value::from(Some(*v))
                }else{
                    Value::from(*v)
                }
            )
        },
        RealValue::U16(v) => {
            Some(
                if is_nullable{
                    Value::from(Some(*v))
                }else{
                    Value::from(*v)
                }
            )
        },
        RealValue::U32(v) => {
            Some(
                if is_nullable{
                    Value::from(Some(*v))
                }else{
                    Value::from(*v)
                }
            )
        },
        RealValue::U64(v) => {
            Some(
                if is_nullable{
                    Value::from(Some(*v))
                }else{
                    Value::from(*v)
                }
            )
        },
        RealValue::I8(v) => {
            Some(
                if is_nullable{
                    Value::from(Some(*v))
                }else{
                    Value::from(*v)
                }
            )
        },
        RealValue::I16(v) => {
            Some(
                if is_nullable{
                    Value::from(Some(*v))
                }else{
                    Value::from(*v)
                }
            )
        },
        RealValue::I32(v) => {
            Some(
                if is_nullable{
                    Value::from(Some(*v))
                }else{
                    Value::from(*v)
                }
            )
        },
        RealValue::I64(v) => {
            Some(
                if is_nullable{
                    Value::from(Some(*v))
                }else{
                    Value::from(*v)
                }
            )
        },
        RealValue::F32(v) => {
            Some(
                if is_nullable{
                    Value::from(Some(*v))
                }else{
                    Value::from(*v)
                }
            )
        },
        RealValue::F64(v) => {
            Some(
                if is_nullable{
                    Value::from(Some(*v))
                }else{
                    Value::from(*v)
                }
            )
        },
        RealValue::Utf8(v) => {
            Some(
                if is_nullable{
                    Value::from(Some(v.as_str()))
                }else{
                    Value::from(v.as_str())
                }
            )
        },
        RealValue::Boolean(_v) => {None},
        RealValue::None => {None},
        RealValue::Bytes(_v) => {None},
        RealValue::Range(_v1,_v2) => {None},
        RealValue::DateTime(v) => {
            Some(
                if is_nullable{
                    Value::from(Some(*v))
                }else{
                    Value::from(*v)
                }
            )
        },
        RealValue::Duration(_) => {None},
    }
}
